package com.my.service.task.map;
import com.my.service.task.entity.CarrierInfo;
import com.my.service.task.util.CarrierUtils;
import com.my.service.task.util.HbaseUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;

public class CarrierMap implements MapFunction<String, CarrierInfo> {

    public CarrierInfo map(String s) throws Exception {
        if(StringUtils.isBlank(s)) return null;
        String[] userinfos = s.split(",");
        String userid = userinfos[0];
        String username = userinfos[1];
        String sex = userinfos[2];
        String telphone = userinfos[3];
        String email = userinfos[4];
        String age = userinfos[5];
        String registerTime = userinfos[6];
        // 0、pc端：1、移动端：2、小程序端
        String usetype = userinfos[7];
        String carrierFlag = CarrierUtils.getCarrierByTel(telphone);
        String tablename = "userflaginfo";
        String rowkey = userid;
        String familyname = "baseinfo";
        String column = "carrierinfo";
        String data = null;
        if(carrierFlag.equals("0")){
            data = "未知运营商";
        }
        else if(carrierFlag.equals("1")){
            data = "移动用户";
        }
        else if(carrierFlag.equals("2")){
            data = "联通用户";
        }
        else{
            data = "电信用户";
        }
        HbaseUtils.putdata(tablename, rowkey, familyname, column, data);
        CarrierInfo cifo = new CarrierInfo();
        String groupfield = "carrierflag == " + carrierFlag;
        cifo.setGroupfield(groupfield);
        cifo.setCarrier(data);
        cifo.setCount(1L);
        return cifo;
    }
}
